LSTM and GRU for Time Series Forecasting (from scratch + PyTorch)#

Recurrent neural networks model sequences by carrying state forward in time. LSTMs and GRUs add gates that make it much easier to learn long-range patterns (seasonality, delayed effects, regime changes) without gradients vanishing/exploding.

In this notebook you will:

  • Build intuition for memory + gates in LSTM/GRU

  • Implement a single-layer LSTM and single-layer GRU in pure NumPy (forward + backprop through time)

  • Train on a small forecasting task and visualize predictions

  • Reproduce the same setup using PyTorch (nn.LSTM, nn.GRU)

  • Visualize learned dynamics with Plotly

import sys

import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots

import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset

pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")

np.set_printoptions(precision=4, suppress=True)

SEED = 42
rng = np.random.default_rng(SEED)
torch.manual_seed(SEED)
<torch._C.Generator at 0x786441d68290>
print("Python:", sys.version.split()[0])
print("NumPy:", np.__version__)
import plotly

print("Plotly:", plotly.__version__)
print("PyTorch:", torch.__version__)
Python: 3.12.9
NumPy: 1.26.2
Plotly: 6.5.2
PyTorch: 2.7.0+cu126

0) Problem setup: one-step-ahead forecasting#

We’ll learn a mapping:

  • Input: a window \((x_{t-L+1}, \dots, x_t)\) of length \(L\)

  • Target: the next value \(x_{t+1}\)

This is the simplest useful forecasting setup and keeps the math in the from-scratch section readable.

def make_toy_series(n: int = 800, noise: float = 0.15, seed: int = 42):
    rng_local = np.random.default_rng(seed)
    t = np.arange(n, dtype=float)

    # A mix of seasonality + trend + noise (toy, but non-trivial)
    y = (
        1.2 * np.sin(2 * np.pi * t / 50)
        + 0.6 * np.sin(2 * np.pi * t / 17 + 0.8)
        + 0.0025 * t
    )
    y += noise * rng_local.normal(size=n)
    return t, y


def make_windows(series_1d: np.ndarray, seq_len: int):
    X = []
    y = []
    target_idx = []
    for start in range(0, len(series_1d) - seq_len):
        end = start + seq_len
        X.append(series_1d[start:end])
        y.append(series_1d[end])
        target_idx.append(end)

    X = np.asarray(X, dtype=float)[:, :, None]  # (N, T, 1)
    y = np.asarray(y, dtype=float)[:, None]  # (N, 1)
    target_idx = np.asarray(target_idx, dtype=int)
    return X, y, target_idx


t, y = make_toy_series(n=900, noise=0.18, seed=SEED)

split_t = int(0.7 * len(y))

train_mean = float(y[:split_t].mean())
train_std = float(y[:split_t].std() + 1e-8)

y_scaled = (y - train_mean) / train_std

SEQ_LEN = 60
X_all, y_all, target_idx = make_windows(y_scaled, seq_len=SEQ_LEN)

train_mask = target_idx < split_t
test_mask = ~train_mask

X_train, y_train = X_all[train_mask], y_all[train_mask]
X_test, y_test = X_all[test_mask], y_all[test_mask]

t_train = t[target_idx[train_mask]]
t_test = t[target_idx[test_mask]]

X_train.shape, y_train.shape, X_test.shape, y_test.shape
((570, 60, 1), (570, 1), (270, 60, 1), (270, 1))
fig = go.Figure()
fig.add_trace(go.Scatter(x=t, y=y, mode="lines", name="series"))
fig.add_vline(x=t[split_t], line_dash="dash", line_color="black")
fig.update_layout(
    title="Toy time series (train/test split)",
    xaxis_title="time index",
    yaxis_title="value",
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()

1) Why gated RNNs (LSTM/GRU) help#

A vanilla RNN is:

\[ h_t = anh(W_x x_t + W_h h_{t-1} + b), \]

and gradients backpropagate through many repeated Jacobian multiplications. Over long sequences this often leads to:

  • vanishing gradients: early time steps get almost no learning signal

  • exploding gradients: unstable updates without clipping

Gates make it easier for the model to choose what to keep, overwrite, or expose at each step.

2) LSTM: explicit memory cell#

An LSTM keeps two states:

  • cell state \(c_t\) (long-term memory)

  • hidden state \(h_t\) (what the model exposes to the next layer / output)

With \(z_t = [x_t, h_{t-1}]\) (concatenation), the core equations are:

\[ egin{aligned} i_t &= \sigma(z_t W_i + b_i) && ext{(input gate)} \ f_t &= \sigma(z_t W_f + b_f) && ext{(forget gate)} \ o_t &= \sigma(z_t W_o + b_o) && ext{(output gate)} \ g_t &= anh(z_t W_g + b_g) && ext{(candidate)} \ c_t &= f_t \odot c_{t-1} + i_t \odot g_t \ h_t &= o_t \odot anh(c_t) \end{aligned} \]

Intuition:

  • \(f_t\) chooses what memory to keep from \(c_{t-1}\)

  • \(i_t\) chooses how much new information to write

  • \(o_t\) chooses how much memory to expose to \(h_t\)

3) GRU: fewer gates, no separate cell state#

A GRU merges the cell and hidden state and uses two gates:

\[ egin{aligned} z_t &= \sigma([x_t, h_{t-1}] W_z + b_z) && ext{(update)} \ r_t &= \sigma([x_t, h_{t-1}] W_r + b_r) && ext{(reset)} \ ilde{h}_t &= anh([x_t, r_t \odot h_{t-1}] W_h + b_h) \ h_t &= (1 - z_t) \odot h_{t-1} + z_t \odot ilde{h}_t \end{aligned} \]

Intuition:

  • \(z_t\) interpolates between keeping the old state vs writing the candidate

  • \(r_t\) decides how much of the old state to use when forming the candidate

4) NumPy from scratch (forward + BPTT)#

We’ll implement two tiny forecasters:

  • NumpyLSTMForecaster

  • NumpyGRUForecaster

Both:

  • take X shaped (batch, seq_len, 1)

  • run a single recurrent layer

  • predict the next value via a linear head on the last hidden state

We’ll also add:

  • Adam optimizer (NumPy)

  • gradient clipping (important for RNNs)

def sigmoid(x: np.ndarray) -> np.ndarray:
    x = np.clip(x, -60, 60)
    return 1.0 / (1.0 + np.exp(-x))


def mse_loss(y_hat: np.ndarray, y: np.ndarray):
    diff = y_hat - y
    loss = float(np.mean(diff**2))
    dY = (2.0 / diff.size) * diff
    return loss, dY


def clip_grads_(grads: dict[str, np.ndarray], max_norm: float = 1.0) -> float:
    total_sq = 0.0
    for g in grads.values():
        total_sq += float(np.sum(g * g))
    total_norm = float(np.sqrt(total_sq))

    if total_norm > max_norm:
        scale = max_norm / (total_norm + 1e-12)
        for k in grads:
            grads[k] *= scale

    return total_norm


def init_weight(rng: np.random.Generator, fan_in: int, fan_out: int):
    # Xavier/Glorot uniform
    limit = np.sqrt(6.0 / (fan_in + fan_out))
    return rng.uniform(-limit, limit, size=(fan_in, fan_out)).astype(float)


class Adam:
    def __init__(
        self,
        params: dict[str, np.ndarray],
        lr: float = 1e-3,
        betas: tuple[float, float] = (0.9, 0.999),
        eps: float = 1e-8,
    ):
        self.lr = lr
        self.beta1, self.beta2 = betas
        self.eps = eps
        self.t = 0
        self.m = {k: np.zeros_like(v) for k, v in params.items()}
        self.v = {k: np.zeros_like(v) for k, v in params.items()}

    def step(self, params: dict[str, np.ndarray], grads: dict[str, np.ndarray]):
        self.t += 1
        lr_t = self.lr * (np.sqrt(1.0 - self.beta2**self.t) / (1.0 - self.beta1**self.t))

        for k in params:
            g = grads[k]
            self.m[k] = self.beta1 * self.m[k] + (1.0 - self.beta1) * g
            self.v[k] = self.beta2 * self.v[k] + (1.0 - self.beta2) * (g * g)
            params[k] -= lr_t * self.m[k] / (np.sqrt(self.v[k]) + self.eps)

4.1) NumPy LSTM forecaster#

This is a single-layer LSTM with a linear prediction head.

Implementation notes:

  • We store per-timestep intermediates (gates, states, concatenated input) in a cache.

  • Backprop is standard BPTT over that cache.

class NumpyLSTMForecaster:
    def __init__(self, input_size: int, hidden_size: int, seed: int = 42):
        self.input_size = input_size
        self.hidden_size = hidden_size

        rng_local = np.random.default_rng(seed)
        fan_in = input_size + hidden_size

        self.params = {
            'W_i': init_weight(rng_local, fan_in, hidden_size),
            'b_i': np.zeros((hidden_size,), dtype=float),
            'W_f': init_weight(rng_local, fan_in, hidden_size),
            'b_f': np.zeros((hidden_size,), dtype=float),
            'W_o': init_weight(rng_local, fan_in, hidden_size),
            'b_o': np.zeros((hidden_size,), dtype=float),
            'W_g': init_weight(rng_local, fan_in, hidden_size),
            'b_g': np.zeros((hidden_size,), dtype=float),
            'W_y': init_weight(rng_local, hidden_size, 1),
            'b_y': np.zeros((1,), dtype=float),
        }

        self.cache = None
        self.last_h = None

    def forward(self, X: np.ndarray) -> np.ndarray:
        # X: (B, T, D)
        B, T, D = X.shape
        H = self.hidden_size

        h = np.zeros((B, H), dtype=float)
        c = np.zeros((B, H), dtype=float)

        caches = []
        p = self.params

        for t in range(T):
            x_t = X[:, t, :]
            z = np.concatenate([x_t, h], axis=1)  # (B, D+H)

            i = sigmoid(z @ p['W_i'] + p['b_i'])
            f = sigmoid(z @ p['W_f'] + p['b_f'])
            o = sigmoid(z @ p['W_o'] + p['b_o'])
            g = np.tanh(z @ p['W_g'] + p['b_g'])

            c_next = f * c + i * g
            tanh_c = np.tanh(c_next)
            h_next = o * tanh_c

            caches.append(
                {
                    'z': z,
                    'i': i,
                    'f': f,
                    'o': o,
                    'g': g,
                    'c_prev': c,
                    'c': c_next,
                    'tanh_c': tanh_c,
                }
            )

            h, c = h_next, c_next

        y_hat = h @ p['W_y'] + p['b_y']
        self.cache = caches
        self.last_h = h
        return y_hat

    def backward(self, dY: np.ndarray) -> dict[str, np.ndarray]:
        assert self.cache is not None and self.last_h is not None

        p = self.params
        grads = {k: np.zeros_like(v) for k, v in p.items()}

        grads['W_y'] = self.last_h.T @ dY
        grads['b_y'] = np.sum(dY, axis=0)

        dh = dY @ p['W_y'].T
        dc = np.zeros_like(dh)

        D = self.input_size

        for step in reversed(self.cache):
            z = step['z']
            i = step['i']
            f = step['f']
            o = step['o']
            g = step['g']
            c_prev = step['c_prev']
            c = step['c']
            tanh_c = step['tanh_c']

            # h = o * tanh(c)
            do = dh * tanh_c
            dc_total = dc + dh * o * (1.0 - tanh_c * tanh_c)

            # c = f*c_prev + i*g
            df = dc_total * c_prev
            di = dc_total * g
            dg = dc_total * i
            dc = dc_total * f

            # gate pre-activations
            dai = di * i * (1.0 - i)
            daf = df * f * (1.0 - f)
            dao = do * o * (1.0 - o)
            dag = dg * (1.0 - g * g)

            grads['W_i'] += z.T @ dai
            grads['b_i'] += np.sum(dai, axis=0)

            grads['W_f'] += z.T @ daf
            grads['b_f'] += np.sum(daf, axis=0)

            grads['W_o'] += z.T @ dao
            grads['b_o'] += np.sum(dao, axis=0)

            grads['W_g'] += z.T @ dag
            grads['b_g'] += np.sum(dag, axis=0)

            dz = (
                dai @ p['W_i'].T
                + daf @ p['W_f'].T
                + dao @ p['W_o'].T
                + dag @ p['W_g'].T
            )

            dh = dz[:, D:]  # propagate to previous hidden state

        return grads
def train_numpy_model(
    model,
    X_train: np.ndarray,
    y_train: np.ndarray,
    *,
    epochs: int = 120,
    batch_size: int = 64,
    lr: float = 2e-3,
    max_grad_norm: float = 1.0,
    seed: int = 42,
    verbose_every: int = 20,
):
    rng_local = np.random.default_rng(seed)
    opt = Adam(model.params, lr=lr)

    history = []
    n = len(X_train)

    for epoch in range(1, epochs + 1):
        idx = rng_local.permutation(n)
        total_loss = 0.0

        for start in range(0, n, batch_size):
            bidx = idx[start : start + batch_size]
            Xb = X_train[bidx]
            yb = y_train[bidx]

            y_hat = model.forward(Xb)
            loss, dY = mse_loss(y_hat, yb)

            grads = model.backward(dY)
            grad_norm = clip_grads_(grads, max_norm=max_grad_norm)
            opt.step(model.params, grads)

            total_loss += loss * len(bidx)

        avg = total_loss / n
        history.append(avg)

        if epoch == 1 or epoch % verbose_every == 0 or epoch == epochs:
            print(f"epoch {epoch:>4}/{epochs} | loss={avg:.6f} | grad_norm={grad_norm:.3f}")

    return np.asarray(history, dtype=float)


def predict_numpy(model, X: np.ndarray) -> np.ndarray:
    return model.forward(X)


def unscale(x_scaled: np.ndarray) -> np.ndarray:
    return x_scaled * train_std + train_mean
FAST_RUN = True

EPOCHS_NUMPY = 80 if FAST_RUN else 200
HIDDEN = 32

lstm_np = NumpyLSTMForecaster(input_size=1, hidden_size=HIDDEN, seed=SEED)

hist_lstm_np = train_numpy_model(
    lstm_np,
    X_train,
    y_train,
    epochs=EPOCHS_NUMPY,
    batch_size=64,
    lr=2e-3,
    max_grad_norm=1.0,
    seed=SEED,
    verbose_every=20,
)

fig = px.line(y=hist_lstm_np, title="NumPy LSTM: training loss", labels={"x": "epoch", "y": "MSE"})
fig.show()
epoch    1/80 | loss=0.949535 | grad_norm=1.457
epoch   20/80 | loss=0.063779 | grad_norm=0.180
epoch   40/80 | loss=0.046973 | grad_norm=0.392
epoch   60/80 | loss=0.046404 | grad_norm=0.322
epoch   80/80 | loss=0.038749 | grad_norm=0.299
y_pred_test_lstm = unscale(predict_numpy(lstm_np, X_test))[:, 0]
y_true_test = unscale(y_test)[:, 0]

fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_true_test, mode="lines", name="true"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_lstm, mode="lines", name="NumPy LSTM"))
fig.update_layout(
    title="One-step-ahead predictions on the test region (NumPy LSTM)",
    xaxis_title="time index",
    yaxis_title="value",
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()

mse = float(np.mean((y_pred_test_lstm - y_true_test) ** 2))
mae = float(np.mean(np.abs(y_pred_test_lstm - y_true_test)))
print(f"Test MSE: {mse:.4f}")
print(f"Test MAE: {mae:.4f}")
Test MSE: 0.1178
Test MAE: 0.2782
def rollout_forecast_numpy(model, seed_window_scaled_1d: np.ndarray, steps: int):
    window = seed_window_scaled_1d.astype(float).copy()
    preds = []
    for _ in range(steps):
        y_hat = model.forward(window[None, :, None])[0, 0]
        preds.append(float(y_hat))
        window = np.concatenate([window[1:], [y_hat]])
    return np.asarray(preds, dtype=float)


# Roll out recursively from the first test window
roll_steps = 200
seed_window = y_scaled[split_t - SEQ_LEN : split_t]
future_true = y[split_t : split_t + roll_steps]

future_pred_scaled = rollout_forecast_numpy(lstm_np, seed_window, steps=roll_steps)
future_pred = unscale(future_pred_scaled)

fig = go.Figure()
fig.add_trace(go.Scatter(x=t[split_t : split_t + roll_steps], y=future_true, mode="lines", name="true"))
fig.add_trace(
    go.Scatter(
        x=t[split_t : split_t + roll_steps],
        y=future_pred,
        mode="lines",
        name="NumPy LSTM (rollout)",
    )
)
fig.update_layout(
    title="Recursive rollout forecast (NumPy LSTM)",
    xaxis_title="time index",
    yaxis_title="value",
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()

Visualizing LSTM gates#

To make the LSTM less of a black box, we can inspect the gate activations for a single input window.

The plots below show hidden units (rows) × time steps (columns).

def lstm_gate_mats(model: NumpyLSTMForecaster, X_one: np.ndarray):
    _ = model.forward(X_one)
    assert model.cache is not None

    i = np.stack([step['i'][0] for step in model.cache], axis=0).T
    f = np.stack([step['f'][0] for step in model.cache], axis=0).T
    o = np.stack([step['o'][0] for step in model.cache], axis=0).T
    g = np.stack([step['g'][0] for step in model.cache], axis=0).T
    c = np.stack([step['c'][0] for step in model.cache], axis=0).T

    return i, f, o, g, c


X_one = X_test[:1]
i, f, o, g, c = lstm_gate_mats(lstm_np, X_one)

fig = make_subplots(
    rows=2,
    cols=2,
    subplot_titles=["input gate i", "forget gate f", "output gate o", "cell state c"],
    horizontal_spacing=0.08,
    vertical_spacing=0.12,
)

fig.add_trace(go.Heatmap(z=i, coloraxis="coloraxis"), row=1, col=1)
fig.add_trace(go.Heatmap(z=f, coloraxis="coloraxis"), row=1, col=2)
fig.add_trace(go.Heatmap(z=o, coloraxis="coloraxis"), row=2, col=1)
fig.add_trace(go.Heatmap(z=c, coloraxis="coloraxis"), row=2, col=2)

fig.update_layout(
    title="NumPy LSTM: gate activations for one window",
    height=750,
    coloraxis=dict(colorscale="Viridis"),
)
fig.update_xaxes(title_text="time step")
fig.update_yaxes(title_text="hidden unit")
fig.show()

4.2) NumPy GRU forecaster#

The GRU is a bit simpler than the LSTM: fewer gates and no separate cell state.

class NumpyGRUForecaster:
    def __init__(self, input_size: int, hidden_size: int, seed: int = 42):
        self.input_size = input_size
        self.hidden_size = hidden_size

        rng_local = np.random.default_rng(seed)
        fan_in = input_size + hidden_size

        self.params = {
            'W_z': init_weight(rng_local, fan_in, hidden_size),
            'b_z': np.zeros((hidden_size,), dtype=float),
            'W_r': init_weight(rng_local, fan_in, hidden_size),
            'b_r': np.zeros((hidden_size,), dtype=float),
            'W_h': init_weight(rng_local, fan_in, hidden_size),
            'b_h': np.zeros((hidden_size,), dtype=float),
            'W_y': init_weight(rng_local, hidden_size, 1),
            'b_y': np.zeros((1,), dtype=float),
        }

        self.cache = None
        self.last_h = None

    def forward(self, X: np.ndarray) -> np.ndarray:
        B, T, D = X.shape
        H = self.hidden_size

        h = np.zeros((B, H), dtype=float)
        caches = []
        p = self.params

        for t in range(T):
            x_t = X[:, t, :]
            concat = np.concatenate([x_t, h], axis=1)

            z = sigmoid(concat @ p['W_z'] + p['b_z'])
            r = sigmoid(concat @ p['W_r'] + p['b_r'])

            concat2 = np.concatenate([x_t, r * h], axis=1)
            h_tilde = np.tanh(concat2 @ p['W_h'] + p['b_h'])

            h_next = (1.0 - z) * h + z * h_tilde

            caches.append(
                {
                    'x_t': x_t,
                    'h_prev': h,
                    'concat': concat,
                    'concat2': concat2,
                    'z': z,
                    'r': r,
                    'h_tilde': h_tilde,
                }
            )
            h = h_next

        y_hat = h @ p['W_y'] + p['b_y']
        self.cache = caches
        self.last_h = h
        return y_hat

    def backward(self, dY: np.ndarray) -> dict[str, np.ndarray]:
        assert self.cache is not None and self.last_h is not None

        p = self.params
        grads = {k: np.zeros_like(v) for k, v in p.items()}

        grads['W_y'] = self.last_h.T @ dY
        grads['b_y'] = np.sum(dY, axis=0)

        dh = dY @ p['W_y'].T
        D = self.input_size

        for step in reversed(self.cache):
            h_prev = step['h_prev']
            concat = step['concat']
            concat2 = step['concat2']
            z = step['z']
            r = step['r']
            h_tilde = step['h_tilde']

            # h = (1-z)*h_prev + z*h_tilde
            dh_tilde = dh * z
            dz = dh * (h_tilde - h_prev)
            dh_prev = dh * (1.0 - z)

            # h_tilde = tanh(concat2 @ W_h + b_h)
            da_h = dh_tilde * (1.0 - h_tilde * h_tilde)
            grads['W_h'] += concat2.T @ da_h
            grads['b_h'] += np.sum(da_h, axis=0)

            dconcat2 = da_h @ p['W_h'].T
            drh_prev = dconcat2[:, D:]

            dr = drh_prev * h_prev
            dh_prev += drh_prev * r

            # r gate
            da_r = dr * r * (1.0 - r)
            grads['W_r'] += concat.T @ da_r
            grads['b_r'] += np.sum(da_r, axis=0)
            dconcat_r = da_r @ p['W_r'].T

            # z gate
            da_z = dz * z * (1.0 - z)
            grads['W_z'] += concat.T @ da_z
            grads['b_z'] += np.sum(da_z, axis=0)
            dconcat_z = da_z @ p['W_z'].T

            dconcat = dconcat_r + dconcat_z
            dh_prev += dconcat[:, D:]

            dh = dh_prev

        return grads
gru_np = NumpyGRUForecaster(input_size=1, hidden_size=HIDDEN, seed=SEED)

hist_gru_np = train_numpy_model(
    gru_np,
    X_train,
    y_train,
    epochs=EPOCHS_NUMPY,
    batch_size=64,
    lr=2e-3,
    max_grad_norm=1.0,
    seed=SEED,
    verbose_every=20,
)

fig = px.line(y=hist_gru_np, title="NumPy GRU: training loss", labels={"x": "epoch", "y": "MSE"})
fig.show()
epoch    1/80 | loss=0.750055 | grad_norm=2.316
epoch   20/80 | loss=0.064718 | grad_norm=0.312
epoch   40/80 | loss=0.047928 | grad_norm=0.152
epoch   60/80 | loss=0.046996 | grad_norm=0.416
epoch   80/80 | loss=0.038497 | grad_norm=0.290
y_pred_test_gru = unscale(predict_numpy(gru_np, X_test))[:, 0]

fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_true_test, mode="lines", name="true"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_gru, mode="lines", name="NumPy GRU"))
fig.update_layout(
    title="One-step-ahead predictions on the test region (NumPy GRU)",
    xaxis_title="time index",
    yaxis_title="value",
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()

mse = float(np.mean((y_pred_test_gru - y_true_test) ** 2))
mae = float(np.mean(np.abs(y_pred_test_gru - y_true_test)))
print(f"Test MSE: {mse:.4f}")
print(f"Test MAE: {mae:.4f}")
Test MSE: 0.0882
Test MAE: 0.2428

Visualizing GRU gates#

We’ll inspect:

  • update gate \(z_t\)

  • reset gate \(r_t\)

def gru_gate_mats(model: NumpyGRUForecaster, X_one: np.ndarray):
    _ = model.forward(X_one)
    assert model.cache is not None

    z = np.stack([step['z'][0] for step in model.cache], axis=0).T
    r = np.stack([step['r'][0] for step in model.cache], axis=0).T
    h_tilde = np.stack([step['h_tilde'][0] for step in model.cache], axis=0).T
    return z, r, h_tilde


z, r, h_tilde = gru_gate_mats(gru_np, X_one)

fig = make_subplots(
    rows=1,
    cols=3,
    subplot_titles=["update gate z", "reset gate r", "candidate h~"],
    horizontal_spacing=0.06,
)
fig.add_trace(go.Heatmap(z=z, coloraxis="coloraxis"), row=1, col=1)
fig.add_trace(go.Heatmap(z=r, coloraxis="coloraxis"), row=1, col=2)
fig.add_trace(go.Heatmap(z=h_tilde, coloraxis="coloraxis"), row=1, col=3)
fig.update_layout(
    title="NumPy GRU: gate activations for one window",
    height=380,
    coloraxis=dict(colorscale="Viridis"),
)
fig.update_xaxes(title_text="time step")
fig.update_yaxes(title_text="hidden unit")
fig.show()

5) PyTorch: the same idea with nn.LSTM / nn.GRU#

PyTorch takes care of:

  • parameter initialization

  • BPTT + autograd

  • efficient kernels

We’ll keep the exact same dataset and model shape to make the comparison fair.

class WindowDataset(Dataset):
    def __init__(self, X: np.ndarray, y: np.ndarray):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self) -> int:
        return self.X.shape[0]

    def __getitem__(self, idx: int):
        return self.X[idx], self.y[idx]


class TorchRNNForecaster(nn.Module):
    def __init__(self, rnn_type: str, input_size: int = 1, hidden_size: int = 32):
        super().__init__()
        rnn_type = rnn_type.lower()
        if rnn_type == 'lstm':
            self.rnn = nn.LSTM(input_size, hidden_size, batch_first=True)
        elif rnn_type == 'gru':
            self.rnn = nn.GRU(input_size, hidden_size, batch_first=True)
        else:
            raise ValueError("rnn_type must be 'lstm' or 'gru'")

        self.head = nn.Linear(hidden_size, 1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out, _ = self.rnn(x)
        h_last = out[:, -1, :]
        return self.head(h_last)


def train_torch_model(
    model: nn.Module,
    loader: DataLoader,
    *,
    epochs: int = 40,
    lr: float = 1e-3,
    max_grad_norm: float = 1.0,
    device: str = 'cpu',
    verbose_every: int = 10,
):
    model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()

    history = []

    for epoch in range(1, epochs + 1):
        model.train()
        total = 0.0

        for xb, yb in loader:
            xb = xb.to(device)
            yb = yb.to(device)

            opt.zero_grad(set_to_none=True)
            pred = model(xb)
            loss = loss_fn(pred, yb)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
            opt.step()

            total += float(loss.item()) * xb.shape[0]

        avg = total / len(loader.dataset)
        history.append(avg)

        if epoch == 1 or epoch % verbose_every == 0 or epoch == epochs:
            print(f"epoch {epoch:>4}/{epochs} | loss={avg:.6f}")

    return np.asarray(history, dtype=float)


def predict_torch(model: nn.Module, X: np.ndarray, device: str = 'cpu') -> np.ndarray:
    model.eval()
    with torch.no_grad():
        preds = model(torch.tensor(X, dtype=torch.float32, device=device)).cpu().numpy()
    return preds
import warnings


def pick_device() -> str:
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", message="CUDA initialization.*")
        try:
            if torch.cuda.is_available():
                return "cuda"
        except Exception:
            pass
    return "cpu"


device = pick_device()
print('device:', device)


EPOCHS_TORCH = 40 if FAST_RUN else 120

train_loader = DataLoader(WindowDataset(X_train, y_train), batch_size=64, shuffle=True)

torch_lstm = TorchRNNForecaster('lstm', input_size=1, hidden_size=HIDDEN)
torch_gru = TorchRNNForecaster('gru', input_size=1, hidden_size=HIDDEN)

hist_torch_lstm = train_torch_model(
    torch_lstm,
    train_loader,
    epochs=EPOCHS_TORCH,
    lr=1e-3,
    max_grad_norm=1.0,
    device=device,
    verbose_every=10,
)

hist_torch_gru = train_torch_model(
    torch_gru,
    train_loader,
    epochs=EPOCHS_TORCH,
    lr=1e-3,
    max_grad_norm=1.0,
    device=device,
    verbose_every=10,
)

fig = go.Figure()
fig.add_trace(go.Scatter(y=hist_lstm_np, mode='lines', name='NumPy LSTM'))
fig.add_trace(go.Scatter(y=hist_gru_np, mode='lines', name='NumPy GRU'))
fig.add_trace(go.Scatter(y=hist_torch_lstm, mode='lines', name='Torch LSTM'))
fig.add_trace(go.Scatter(y=hist_torch_gru, mode='lines', name='Torch GRU'))
fig.update_layout(title='Training loss comparison', xaxis_title='epoch', yaxis_title='MSE')
fig.show()
device: cpu
epoch    1/40 | loss=0.948046
epoch   10/40 | loss=0.205258
epoch   20/40 | loss=0.078450
epoch   30/40 | loss=0.058539
epoch   40/40 | loss=0.046161
epoch    1/40 | loss=0.924206
epoch   10/40 | loss=0.171874
epoch   20/40 | loss=0.076581
epoch   30/40 | loss=0.070679
epoch   40/40 | loss=0.067584
y_pred_test_torch_lstm = unscale(predict_torch(torch_lstm, X_test, device=device))[:, 0]
y_pred_test_torch_gru = unscale(predict_torch(torch_gru, X_test, device=device))[:, 0]

fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_true_test, mode="lines", name="true"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_lstm, mode="lines", name="NumPy LSTM"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_gru, mode="lines", name="NumPy GRU"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_torch_lstm, mode="lines", name="Torch LSTM"))
fig.add_trace(go.Scatter(x=t_test, y=y_pred_test_torch_gru, mode="lines", name="Torch GRU"))

fig.update_layout(
    title="One-step-ahead predictions (test region)",
    xaxis_title="time index",
    yaxis_title="value",
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)
fig.show()

6) Pitfalls + diagnostics (practical)#

  • Scale your data: unscaled series often makes training unstable.

  • Exploding gradients: use gradient clipping (we did).

  • Sequence length: too short → misses long seasonality; too long → harder optimization.

  • Train/test split: split by time, not random shuffle.

  • Rollout error accumulation: one-step models can drift when rolled out recursively.

If you want a stronger forecaster:

  • predict multiple future steps (seq2seq)

  • add exogenous features (calendar, holidays, covariates)

  • use teacher forcing / scheduled sampling

7) Exercises#

  1. Increase the noise and compare LSTM vs GRU stability.

  2. Add a second seasonal component with a longer period and test different SEQ_LEN.

  3. Predict multiple steps ahead (e.g. next 10 points) and compare direct vs recursive forecasting.

  4. Add dropout (PyTorch) and see how it affects rollout drift.

References#

  • Hochreiter & Schmidhuber (1997): Long Short-Term Memory

  • Cho et al. (2014): Learning Phrase Representations using RNN Encoder–Decoder (introduces GRU)

  • Goodfellow, Bengio, Courville: Deep Learning (sequence models chapters)